package com.github.mall.common.kafka.consumer.impl;

import cn.hutool.core.lang.Dict;
import cn.hutool.extra.mail.MailUtil;
import cn.hutool.json.JSONUtil;
import com.github.mall.common.kafka.consumer.IKafkaConsumer;
import com.github.mall.common.kafka.entity.Message;
import freemarker.template.Configuration;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;

import java.util.Optional;

@Component
@AllArgsConstructor
@Slf4j
public class UserMailKafkaConsumer implements IKafkaConsumer {

  private final Configuration freemarkerConfiguration;

  @Override
  @KafkaListener(topics = {"ums_mail_topic"})
  @SneakyThrows
  public void listen(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
      Message message = JSONUtil.toBean(JSONUtil.parseObj(kafkaMessage.get()), Message.class);
      Dict userMailParam = JSONUtil.toBean(JSONUtil.parseObj(message.getData()), Dict.class);
      String userMail = userMailParam.getStr("userMail");
      String subject = userMailParam.getStr("subject");
      String content = FreeMarkerTemplateUtils.processTemplateIntoString(
        this.freemarkerConfiguration.getTemplate("mail.ftl"), userMailParam);
      MailUtil.send(userMail, subject, content, true);
      if (log.isDebugEnabled()) {
        log.debug("邮箱发送成功，发送参数:"+userMailParam);
      }
    }
  }
}
